"""
Test the pipeline module.
"""

import itertools
import re
import shutil
import time
from tempfile import mkdtemp

import joblib
import numpy as np
import pytest

from sklearn import config_context
from sklearn.base import (
    BaseEstimator,
    ClassifierMixin,
    TransformerMixin,
    clone,
    is_classifier,
    is_regressor,
)
from sklearn.cluster import KMeans
from sklearn.datasets import load_iris
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.dummy import DummyRegressor
from sklearn.ensemble import (
    HistGradientBoostingClassifier,
    RandomForestClassifier,
    RandomTreesEmbedding,
)
from sklearn.exceptions import NotFittedError, UnsetMetadataPassedError
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Lasso, LinearRegression, LogisticRegression
from sklearn.metrics import accuracy_score, r2_score
from sklearn.model_selection import train_test_split
from sklearn.neighbors import LocalOutlierFactor
from sklearn.pipeline import FeatureUnion, Pipeline, make_pipeline, make_union
from sklearn.preprocessing import FunctionTransformer, StandardScaler
from sklearn.svm import SVC
from sklearn.tests.metadata_routing_common import (
    ConsumingNoFitTransformTransformer,
    ConsumingTransformer,
    _Registry,
    check_recorded_metadata,
)
from sklearn.utils import get_tags
from sklearn.utils._metadata_requests import COMPOSITE_METHODS, METHODS
from sklearn.utils._testing import (
    MinimalClassifier,
    MinimalRegressor,
    MinimalTransformer,
    assert_allclose,
    assert_array_almost_equal,
    assert_array_equal,
)
from sklearn.utils.fixes import CSR_CONTAINERS
from sklearn.utils.validation import _check_feature_names, check_is_fitted

# Load a shared tests data sets for the tests in this module. Mark them
# read-only to avoid unintentional in-place modifications that would introduce
# side-effects between tests.
iris = load_iris()
iris.data.flags.writeable = False
iris.target.flags.writeable = False


JUNK_FOOD_DOCS = (
    "the pizza pizza beer copyright",
    "the pizza burger beer copyright",
    "the the pizza beer beer copyright",
    "the burger beer beer copyright",
    "the coke burger coke copyright",
    "the coke burger burger",
)


class NoFit(BaseEstimator):
    """Small class to test parameter dispatching."""

    def __init__(self, a=None, b=None):
        self.a = a
        self.b = b


class NoTrans(NoFit):
    def fit(self, X, y=None):
        return self

    def get_params(self, deep=False):
        return {"a": self.a, "b": self.b}

    def set_params(self, **params):
        self.a = params["a"]
        return self


class NoInvTransf(TransformerMixin, NoTrans):
    def transform(self, X):
        return X


class Transf(NoInvTransf):
    def transform(self, X):
        return X

    def inverse_transform(self, X):
        return X


class TransfFitParams(Transf):
    def fit(self, X, y=None, **fit_params):
        self.fit_params = fit_params
        return self


class Mult(TransformerMixin, BaseEstimator):
    def __init__(self, mult=1):
        self.mult = mult

    def __sklearn_is_fitted__(self):
        return True

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return np.asarray(X) * self.mult

    def inverse_transform(self, X):
        return np.asarray(X) / self.mult

    def predict(self, X):
        return (np.asarray(X) * self.mult).sum(axis=1)

    predict_proba = predict_log_proba = decision_function = predict

    def score(self, X, y=None):
        return np.sum(X)


class FitParamT(BaseEstimator):
    """Mock classifier"""

    def __init__(self):
        self.successful = False

    def fit(self, X, y, should_succeed=False):
        self.successful = should_succeed
        self.fitted_ = True

    def predict(self, X):
        return self.successful

    def fit_predict(self, X, y, should_succeed=False):
        self.fit(X, y, should_succeed=should_succeed)
        return self.predict(X)

    def score(self, X, y=None, sample_weight=None):
        if sample_weight is not None:
            X = X * sample_weight
        return np.sum(X)


class DummyTransf(Transf):
    """Transformer which store the column means"""

    def fit(self, X, y):
        self.means_ = np.mean(X, axis=0)
        # store timestamp to figure out whether the result of 'fit' has been
        # cached or not
        self.timestamp_ = time.time()
        return self


class DummyEstimatorParams(BaseEstimator):
    """Mock classifier that takes params on predict"""

    def __sklearn_is_fitted__(self):
        return True

    def fit(self, X, y):
        return self

    def predict(self, X, got_attribute=False):
        self.got_attribute = got_attribute
        return self

    def predict_proba(self, X, got_attribute=False):
        self.got_attribute = got_attribute
        return self

    def predict_log_proba(self, X, got_attribute=False):
        self.got_attribute = got_attribute
        return self


def test_pipeline_invalid_parameters():
    # Test the various init parameters of the pipeline in fit
    # method
    pipeline = Pipeline([(1, 1)])
    with pytest.raises(TypeError):
        pipeline.fit([[1]], [1])

    # Check that we can't fit pipelines with objects without fit
    # method
    msg = (
        "Last step of Pipeline should implement fit "
        "or be the string 'passthrough'"
        ".*NoFit.*"
    )
    pipeline = Pipeline([("clf", NoFit())])
    with pytest.raises(TypeError, match=msg):
        pipeline.fit([[1]], [1])

    # Smoke test with only an estimator
    clf = NoTrans()
    pipe = Pipeline([("svc", clf)])
    assert pipe.get_params(deep=True) == dict(
        svc__a=None, svc__b=None, svc=clf, **pipe.get_params(deep=False)
    )

    # Check that params are set
    pipe.set_params(svc__a=0.1)
    assert clf.a == 0.1
    assert clf.b is None
    # Smoke test the repr:
    repr(pipe)

    # Test with two objects
    clf = SVC()
    filter1 = SelectKBest(f_classif)
    pipe = Pipeline([("anova", filter1), ("svc", clf)])

    # Check that estimators are not cloned on pipeline construction
    assert pipe.named_steps["anova"] is filter1
    assert pipe.named_steps["svc"] is clf

    # Check that we can't fit with non-transformers on the way
    # Note that NoTrans implements fit, but not transform
    msg = "All intermediate steps should be transformers.*\\bNoTrans\\b.*"
    pipeline = Pipeline([("t", NoTrans()), ("svc", clf)])
    with pytest.raises(TypeError, match=msg):
        pipeline.fit([[1]], [1])

    # Check that params are set
    pipe.set_params(svc__C=0.1)
    assert clf.C == 0.1
    # Smoke test the repr:
    repr(pipe)

    # Check that params are not set when naming them wrong
    msg = re.escape(
        "Invalid parameter 'C' for estimator SelectKBest(). Valid parameters are: ['k',"
        " 'score_func']."
    )
    with pytest.raises(ValueError, match=msg):
        pipe.set_params(anova__C=0.1)

    # Test clone
    pipe2 = clone(pipe)
    assert pipe.named_steps["svc"] is not pipe2.named_steps["svc"]

    # Check that apart from estimators, the parameters are the same
    params = pipe.get_params(deep=True)
    params2 = pipe2.get_params(deep=True)

    for x in pipe.get_params(deep=False):
        params.pop(x)

    for x in pipe2.get_params(deep=False):
        params2.pop(x)

    # Remove estimators that where copied
    params.pop("svc")
    params.pop("anova")
    params2.pop("svc")
    params2.pop("anova")
    assert params == params2


def test_empty_pipeline():
    X = iris.data
    y = iris.target

    pipe = Pipeline([])
    msg = "The pipeline is empty. Please add steps."
    with pytest.raises(ValueError, match=msg):
        pipe.fit(X, y)


def test_pipeline_init_tuple():
    # Pipeline accepts steps as tuple
    X = np.array([[1, 2]])
    pipe = Pipeline((("transf", Transf()), ("clf", FitParamT())))
    pipe.fit(X, y=None)
    pipe.score(X)

    pipe.set_params(transf="passthrough")
    pipe.fit(X, y=None)
    pipe.score(X)


def test_pipeline_methods_anova():
    # Test the various methods of the pipeline (anova).
    X = iris.data
    y = iris.target
    # Test with Anova + LogisticRegression
    clf = LogisticRegression()
    filter1 = SelectKBest(f_classif, k=2)
    pipe = Pipeline([("anova", filter1), ("logistic", clf)])
    pipe.fit(X, y)
    pipe.predict(X)
    pipe.predict_proba(X)
    pipe.predict_log_proba(X)
    pipe.score(X, y)


def test_pipeline_fit_params():
    # Test that the pipeline can take fit parameters
    pipe = Pipeline([("transf", Transf()), ("clf", FitParamT())])
    pipe.fit(X=None, y=None, clf__should_succeed=True)
    # classifier should return True
    assert pipe.predict(None)
    # and transformer params should not be changed
    assert pipe.named_steps["transf"].a is None
    assert pipe.named_steps["transf"].b is None
    # invalid parameters should raise an error message

    msg = re.escape("fit() got an unexpected keyword argument 'bad'")
    with pytest.raises(TypeError, match=msg):
        pipe.fit(None, None, clf__bad=True)


def test_pipeline_sample_weight_supported():
    # Pipeline should pass sample_weight
    X = np.array([[1, 2]])
    pipe = Pipeline([("transf", Transf()), ("clf", FitParamT())])
    pipe.fit(X, y=None)
    assert pipe.score(X) == 3
    assert pipe.score(X, y=None) == 3
    assert pipe.score(X, y=None, sample_weight=None) == 3
    assert pipe.score(X, sample_weight=np.array([2, 3])) == 8


def test_pipeline_sample_weight_unsupported():
    # When sample_weight is None it shouldn't be passed
    X = np.array([[1, 2]])
    pipe = Pipeline([("transf", Transf()), ("clf", Mult())])
    pipe.fit(X, y=None)
    assert pipe.score(X) == 3
    assert pipe.score(X, sample_weight=None) == 3

    msg = re.escape("score() got an unexpected keyword argument 'sample_weight'")
    with pytest.raises(TypeError, match=msg):
        pipe.score(X, sample_weight=np.array([2, 3]))


def test_pipeline_raise_set_params_error():
    # Test pipeline raises set params error message for nested models.
    pipe = Pipeline([("cls", LinearRegression())])

    # expected error message
    error_msg = re.escape(
        "Invalid parameter 'fake' for estimator Pipeline(steps=[('cls',"
        " LinearRegression())]). Valid parameters are: ['memory', 'steps',"
        " 'transform_input', 'verbose']."
    )
    with pytest.raises(ValueError, match=error_msg):
        pipe.set_params(fake="nope")

    # invalid outer parameter name for compound parameter: the expected error message
    # is the same as above.
    with pytest.raises(ValueError, match=error_msg):
        pipe.set_params(fake__estimator="nope")

    # expected error message for invalid inner parameter
    error_msg = re.escape(
        "Invalid parameter 'invalid_param' for estimator LinearRegression(). Valid"
        " parameters are: ['copy_X', 'fit_intercept', 'n_jobs', 'positive', 'tol']."
    )
    with pytest.raises(ValueError, match=error_msg):
        pipe.set_params(cls__invalid_param="nope")


def test_pipeline_methods_pca_svm():
    # Test the various methods of the pipeline (pca + svm).
    X = iris.data
    y = iris.target
    # Test with PCA + SVC
    clf = SVC(probability=True, random_state=0)
    pca = PCA(svd_solver="full", n_components="mle", whiten=True)
    pipe = Pipeline([("pca", pca), ("svc", clf)])
    pipe.fit(X, y)
    pipe.predict(X)
    pipe.predict_proba(X)
    pipe.predict_log_proba(X)
    pipe.score(X, y)


def test_pipeline_score_samples_pca_lof():
    X = iris.data
    # Test that the score_samples method is implemented on a pipeline.
    # Test that the score_samples method on pipeline yields same results as
    # applying transform and score_samples steps separately.
    pca = PCA(svd_solver="full", n_components="mle", whiten=True)
    lof = LocalOutlierFactor(novelty=True)
    pipe = Pipeline([("pca", pca), ("lof", lof)])
    pipe.fit(X)
    # Check the shapes
    assert pipe.score_samples(X).shape == (X.shape[0],)
    # Check the values
    lof.fit(pca.fit_transform(X))
    assert_allclose(pipe.score_samples(X), lof.score_samples(pca.transform(X)))


def test_score_samples_on_pipeline_without_score_samples():
    X = np.array([[1], [2]])
    y = np.array([1, 2])
    # Test that a pipeline does not have score_samples method when the final
    # step of the pipeline does not have score_samples defined.
    pipe = make_pipeline(LogisticRegression())
    pipe.fit(X, y)

    inner_msg = "'LogisticRegression' object has no attribute 'score_samples'"
    outer_msg = "'Pipeline' has no attribute 'score_samples'"
    with pytest.raises(AttributeError, match=outer_msg) as exec_info:
        pipe.score_samples(X)

    assert isinstance(exec_info.value.__cause__, AttributeError)
    assert inner_msg in str(exec_info.value.__cause__)


def test_pipeline_methods_preprocessing_svm():
    # Test the various methods of the pipeline (preprocessing + svm).
    X = iris.data
    y = iris.target
    n_samples = X.shape[0]
    n_classes = len(np.unique(y))
    scaler = StandardScaler()
    pca = PCA(n_components=2, svd_solver="randomized", whiten=True)
    clf = SVC(probability=True, random_state=0, decision_function_shape="ovr")

    for preprocessing in [scaler, pca]:
        pipe = Pipeline([("preprocess", preprocessing), ("svc", clf)])
        pipe.fit(X, y)

        # check shapes of various prediction functions
        predict = pipe.predict(X)
        assert predict.shape == (n_samples,)

        proba = pipe.predict_proba(X)
        assert proba.shape == (n_samples, n_classes)

        log_proba = pipe.predict_log_proba(X)
        assert log_proba.shape == (n_samples, n_classes)

        decision_function = pipe.decision_function(X)
        assert decision_function.shape == (n_samples, n_classes)

        pipe.score(X, y)


def test_fit_predict_on_pipeline():
    # test that the fit_predict method is implemented on a pipeline
    # test that the fit_predict on pipeline yields same results as applying
    # transform and clustering steps separately
    scaler = StandardScaler()
    km = KMeans(random_state=0, n_init="auto")
    # As pipeline doesn't clone estimators on construction,
    # it must have its own estimators
    scaler_for_pipeline = StandardScaler()
    km_for_pipeline = KMeans(random_state=0, n_init="auto")

    # first compute the transform and clustering step separately
    scaled = scaler.fit_transform(iris.data)
    separate_pred = km.fit_predict(scaled)

    # use a pipeline to do the transform and clustering in one step
    pipe = Pipeline([("scaler", scaler_for_pipeline), ("Kmeans", km_for_pipeline)])
    pipeline_pred = pipe.fit_predict(iris.data)

    assert_array_almost_equal(pipeline_pred, separate_pred)


def test_fit_predict_on_pipeline_without_fit_predict():
    # tests that a pipeline does not have fit_predict method when final
    # step of pipeline does not have fit_predict defined
    scaler = StandardScaler()
    pca = PCA(svd_solver="full")
    pipe = Pipeline([("scaler", scaler), ("pca", pca)])

    outer_msg = "'Pipeline' has no attribute 'fit_predict'"
    inner_msg = "'PCA' object has no attribute 'fit_predict'"
    with pytest.raises(AttributeError, match=outer_msg) as exec_info:
        getattr(pipe, "fit_predict")
    assert isinstance(exec_info.value.__cause__, AttributeError)
    assert inner_msg in str(exec_info.value.__cause__)


def test_fit_predict_with_intermediate_fit_params():
    # tests that Pipeline passes fit_params to intermediate steps
    # when fit_predict is invoked
    pipe = Pipeline([("transf", TransfFitParams()), ("clf", FitParamT())])
    pipe.fit_predict(
        X=None, y=None, transf__should_get_this=True, clf__should_succeed=True
    )
    assert pipe.named_steps["transf"].fit_params["should_get_this"]
    assert pipe.named_steps["clf"].successful
    assert "should_succeed" not in pipe.named_steps["transf"].fit_params


@pytest.mark.parametrize(
    "method_name", ["predict", "predict_proba", "predict_log_proba"]
)
def test_predict_methods_with_predict_params(method_name):
    # tests that Pipeline passes predict_* to the final estimator
    # when predict_* is invoked
    pipe = Pipeline([("transf", Transf()), ("clf", DummyEstimatorParams())])
    pipe.fit(None, None)
    method = getattr(pipe, method_name)
    method(X=None, got_attribute=True)

    assert pipe.named_steps["clf"].got_attribute


@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_feature_union(csr_container):
    # basic sanity check for feature union
    X = iris.data.copy()
    X -= X.mean(axis=0)
    y = iris.target
    svd = TruncatedSVD(n_components=2, random_state=0)
    select = SelectKBest(k=1)
    fs = FeatureUnion([("svd", svd), ("select", select)])
    fs.fit(X, y)
    X_transformed = fs.transform(X)
    assert X_transformed.shape == (X.shape[0], 3)

    # check if it does the expected thing
    assert_array_almost_equal(X_transformed[:, :-1], svd.fit_transform(X))
    assert_array_equal(X_transformed[:, -1], select.fit_transform(X, y).ravel())

    # test if it also works for sparse input
    # We use a different svd object to control the random_state stream
    fs = FeatureUnion([("svd", svd), ("select", select)])
    X_sp = csr_container(X)
    X_sp_transformed = fs.fit_transform(X_sp, y)
    assert_array_almost_equal(X_transformed, X_sp_transformed.toarray())

    # Test clone
    fs2 = clone(fs)
    assert fs.transformer_list[0][1] is not fs2.transformer_list[0][1]

    # test setting parameters
    fs.set_params(select__k=2)
    assert fs.fit_transform(X, y).shape == (X.shape[0], 4)

    # test it works with transformers missing fit_transform
    fs = FeatureUnion([("mock", Transf()), ("svd", svd), ("select", select)])
    X_transformed = fs.fit_transform(X, y)
    assert X_transformed.shape == (X.shape[0], 8)

    # test error if some elements do not support transform
    msg = "All estimators should implement fit and transform.*\\bNoTrans\\b"
    fs = FeatureUnion([("transform", Transf()), ("no_transform", NoTrans())])
    with pytest.raises(TypeError, match=msg):
        fs.fit(X)

    # test that init accepts tuples
    fs = FeatureUnion((("svd", svd), ("select", select)))
    fs.fit(X, y)


def test_feature_union_named_transformers():
    """Check the behaviour of `named_transformers` attribute."""
    transf = Transf()
    noinvtransf = NoInvTransf()
    fs = FeatureUnion([("transf", transf), ("noinvtransf", noinvtransf)])
    assert fs.named_transformers["transf"] == transf
    assert fs.named_transformers["noinvtransf"] == noinvtransf

    # test named attribute
    assert fs.named_transformers.transf == transf
    assert fs.named_transformers.noinvtransf == noinvtransf


def test_make_union():
    pca = PCA(svd_solver="full")
    mock = Transf()
    fu = make_union(pca, mock)
    names, transformers = zip(*fu.transformer_list)
    assert names == ("pca", "transf")
    assert transformers == (pca, mock)


def test_make_union_kwargs():
    pca = PCA(svd_solver="full")
    mock = Transf()
    fu = make_union(pca, mock, n_jobs=3)
    assert fu.transformer_list == make_union(pca, mock).transformer_list
    assert 3 == fu.n_jobs

    # invalid keyword parameters should raise an error message
    msg = re.escape(
        "make_union() got an unexpected keyword argument 'transformer_weights'"
    )
    with pytest.raises(TypeError, match=msg):
        make_union(pca, mock, transformer_weights={"pca": 10, "Transf": 1})


def create_mock_transformer(base_name, n_features=3):
    """Helper to create a mock transformer with custom feature names."""
    mock = Transf()
    mock.get_feature_names_out = lambda input_features: [
        f"{base_name}{i}" for i in range(n_features)
    ]
    return mock


def test_make_union_passes_verbose_feature_names_out():
    # Test that make_union passes verbose_feature_names_out
    # to the FeatureUnion.
    X = iris.data
    y = iris.target

    pca = PCA()
    mock = create_mock_transformer("transf")
    union = make_union(pca, mock, verbose_feature_names_out=False)

    assert not union.verbose_feature_names_out

    fu_union = make_union(pca, mock, verbose_feature_names_out=True)
    fu_union.fit(X, y)

    assert_array_equal(
        [
            "pca__pca0",
            "pca__pca1",
            "pca__pca2",
            "pca__pca3",
            "transf__transf0",
            "transf__transf1",
            "transf__transf2",
        ],
        fu_union.get_feature_names_out(),
    )


def test_pipeline_transform():
    # Test whether pipeline works with a transformer at the end.
    # Also test pipeline.transform and pipeline.inverse_transform
    X = iris.data
    pca = PCA(n_components=2, svd_solver="full")
    pipeline = Pipeline([("pca", pca)])

    # test transform and fit_transform:
    X_trans = pipeline.fit(X).transform(X)
    X_trans2 = pipeline.fit_transform(X)
    X_trans3 = pca.fit_transform(X)
    assert_array_almost_equal(X_trans, X_trans2)
    assert_array_almost_equal(X_trans, X_trans3)

    X_back = pipeline.inverse_transform(X_trans)
    X_back2 = pca.inverse_transform(X_trans)
    assert_array_almost_equal(X_back, X_back2)


def test_pipeline_fit_transform():
    # Test whether pipeline works with a transformer missing fit_transform
    X = iris.data
    y = iris.target
    transf = Transf()
    pipeline = Pipeline([("mock", transf)])

    # test fit_transform:
    X_trans = pipeline.fit_transform(X, y)
    X_trans2 = transf.fit(X, y).transform(X)
    assert_array_almost_equal(X_trans, X_trans2)


@pytest.mark.parametrize(
    "start, end", [(0, 1), (0, 2), (1, 2), (1, 3), (None, 1), (1, None), (None, None)]
)
def test_pipeline_slice(start, end):
    pipe = Pipeline(
        [("transf1", Transf()), ("transf2", Transf()), ("clf", FitParamT())],
        memory="123",
        verbose=True,
    )
    pipe_slice = pipe[start:end]
    # Test class
    assert isinstance(pipe_slice, Pipeline)
    # Test steps
    assert pipe_slice.steps == pipe.steps[start:end]
    # Test named_steps attribute
    assert (
        list(pipe_slice.named_steps.items())
        == list(pipe.named_steps.items())[start:end]
    )
    # Test the rest of the parameters
    pipe_params = pipe.get_params(deep=False)
    pipe_slice_params = pipe_slice.get_params(deep=False)
    del pipe_params["steps"]
    del pipe_slice_params["steps"]
    assert pipe_params == pipe_slice_params
    # Test exception
    msg = "Pipeline slicing only supports a step of 1"
    with pytest.raises(ValueError, match=msg):
        pipe[start:end:-1]


def test_pipeline_index():
    transf = Transf()
    clf = FitParamT()
    pipe = Pipeline([("transf", transf), ("clf", clf)])
    assert pipe[0] == transf
    assert pipe["transf"] == transf
    assert pipe[-1] == clf
    assert pipe["clf"] == clf

    # should raise an error if slicing out of range
    with pytest.raises(IndexError):
        pipe[3]

    # should raise an error if indexing with wrong element name
    with pytest.raises(KeyError):
        pipe["foobar"]


def test_set_pipeline_steps():
    transf1 = Transf()
    transf2 = Transf()
    pipeline = Pipeline([("mock", transf1)])
    assert pipeline.named_steps["mock"] is transf1

    # Directly setting attr
    pipeline.steps = [("mock2", transf2)]
    assert "mock" not in pipeline.named_steps
    assert pipeline.named_steps["mock2"] is transf2
    assert [("mock2", transf2)] == pipeline.steps

    # Using set_params
    pipeline.set_params(steps=[("mock", transf1)])
    assert [("mock", transf1)] == pipeline.steps

    # Using set_params to replace single step
    pipeline.set_params(mock=transf2)
    assert [("mock", transf2)] == pipeline.steps

    # With invalid data
    pipeline.set_params(steps=[("junk", ())])
    msg = re.escape(
        "Last step of Pipeline should implement fit or be the string 'passthrough'."
    )
    with pytest.raises(TypeError, match=msg):
        pipeline.fit([[1]], [1])

    msg = "This 'Pipeline' has no attribute 'fit_transform'"
    with pytest.raises(AttributeError, match=msg):
        pipeline.fit_transform([[1]], [1])


def test_pipeline_named_steps():
    transf = Transf()
    mult2 = Mult(mult=2)
    pipeline = Pipeline([("mock", transf), ("mult", mult2)])

    # Test access via named_steps bunch object
    assert "mock" in pipeline.named_steps
    assert "mock2" not in pipeline.named_steps
    assert pipeline.named_steps.mock is transf
    assert pipeline.named_steps.mult is mult2

    # Test bunch with conflict attribute of dict
    pipeline = Pipeline([("values", transf), ("mult", mult2)])
    assert pipeline.named_steps.values is not transf
    assert pipeline.named_steps.mult is mult2


@pytest.mark.parametrize("passthrough", [None, "passthrough"])
def test_pipeline_correctly_adjusts_steps(passthrough):
    X = np.array([[1]])
    y = np.array([1])
    mult2 = Mult(mult=2)
    mult3 = Mult(mult=3)
    mult5 = Mult(mult=5)

    pipeline = Pipeline(
        [("m2", mult2), ("bad", passthrough), ("m3", mult3), ("m5", mult5)]
    )

    pipeline.fit(X, y)
    expected_names = ["m2", "bad", "m3", "m5"]
    actual_names = [name for name, _ in pipeline.steps]
    assert expected_names == actual_names


@pytest.mark.parametrize("passthrough", [None, "passthrough"])
def test_set_pipeline_step_passthrough(passthrough):
    X = np.array([[1]])
    y = np.array([1])
    mult2 = Mult(mult=2)
    mult3 = Mult(mult=3)
    mult5 = Mult(mult=5)

    def make():
        return Pipeline([("m2", mult2), ("m3", mult3), ("last", mult5)])

    pipeline = make()

    exp = 2 * 3 * 5
    assert_array_equal([[exp]], pipeline.fit_transform(X, y))
    assert_array_equal([exp], pipeline.fit(X).predict(X))
    assert_array_equal(X, pipeline.inverse_transform([[exp]]))

    pipeline.set_params(m3=passthrough)
    exp = 2 * 5
    assert_array_equal([[exp]], pipeline.fit_transform(X, y))
    assert_array_equal([exp], pipeline.fit(X).predict(X))
    assert_array_equal(X, pipeline.inverse_transform([[exp]]))
    assert pipeline.get_params(deep=True) == {
        "steps": pipeline.steps,
        "m2": mult2,
        "m3": passthrough,
        "last": mult5,
        "memory": None,
        "m2__mult": 2,
        "last__mult": 5,
        "transform_input": None,
        "verbose": False,
    }

    pipeline.set_params(m2=passthrough)
    exp = 5
    assert_array_equal([[exp]], pipeline.fit_transform(X, y))
    assert_array_equal([exp], pipeline.fit(X).predict(X))
    assert_array_equal(X, pipeline.inverse_transform([[exp]]))

    # for other methods, ensure no AttributeErrors on None:
    other_methods = [
        "predict_proba",
        "predict_log_proba",
        "decision_function",
        "transform",
        "score",
    ]
    for method in other_methods:
        getattr(pipeline, method)(X)

    pipeline.set_params(m2=mult2)
    exp = 2 * 5
    assert_array_equal([[exp]], pipeline.fit_transform(X, y))
    assert_array_equal([exp], pipeline.fit(X).predict(X))
    assert_array_equal(X, pipeline.inverse_transform([[exp]]))

    pipeline = make()
    pipeline.set_params(last=passthrough)
    # mult2 and mult3 are active
    exp = 6
    assert_array_equal([[exp]], pipeline.fit(X, y).transform(X))
    assert_array_equal([[exp]], pipeline.fit_transform(X, y))
    assert_array_equal(X, pipeline.inverse_transform([[exp]]))

    inner_msg = "'str' object has no attribute 'predict'"
    outer_msg = "This 'Pipeline' has no attribute 'predict'"
    with pytest.raises(AttributeError, match=outer_msg) as exec_info:
        getattr(pipeline, "predict")
    assert isinstance(exec_info.value.__cause__, AttributeError)
    assert inner_msg in str(exec_info.value.__cause__)

    # Check 'passthrough' step at construction time
    exp = 2 * 5
    pipeline = Pipeline([("m2", mult2), ("m3", passthrough), ("last", mult5)])
    assert_array_equal([[exp]], pipeline.fit_transform(X, y))
    assert_array_equal([exp], pipeline.fit(X).predict(X))
    assert_array_equal(X, pipeline.inverse_transform([[exp]]))


def test_pipeline_ducktyping():
    pipeline = make_pipeline(Mult(5))
    pipeline.predict
    pipeline.transform
    pipeline.inverse_transform

    pipeline = make_pipeline(Transf())
    assert not hasattr(pipeline, "predict")
    pipeline.transform
    pipeline.inverse_transform

    pipeline = make_pipeline("passthrough")
    assert pipeline.steps[0] == ("passthrough", "passthrough")
    assert not hasattr(pipeline, "predict")
    pipeline.transform
    pipeline.inverse_transform

    pipeline = make_pipeline(Transf(), NoInvTransf())
    assert not hasattr(pipeline, "predict")
    pipeline.transform
    assert not hasattr(pipeline, "inverse_transform")

    pipeline = make_pipeline(NoInvTransf(), Transf())
    assert not hasattr(pipeline, "predict")
    pipeline.transform
    assert not hasattr(pipeline, "inverse_transform")


def test_make_pipeline():
    t1 = Transf()
    t2 = Transf()
    pipe = make_pipeline(t1, t2)
    assert isinstance(pipe, Pipeline)
    assert pipe.steps[0][0] == "transf-1"
    assert pipe.steps[1][0] == "transf-2"

    pipe = make_pipeline(t1, t2, FitParamT())
    assert isinstance(pipe, Pipeline)
    assert pipe.steps[0][0] == "transf-1"
    assert pipe.steps[1][0] == "transf-2"
    assert pipe.steps[2][0] == "fitparamt"


@pytest.mark.parametrize(
    "pipeline, check_estimator_type",
    [
        (make_pipeline(StandardScaler(), LogisticRegression()), is_classifier),
        (make_pipeline(StandardScaler(), LinearRegression()), is_regressor),
        (
            make_pipeline(StandardScaler()),
            lambda est: get_tags(est).estimator_type is None,
        ),
        (Pipeline([]), lambda est: get_tags(est).estimator_type is None),
    ],
)
def test_pipeline_estimator_type(pipeline, check_estimator_type):
    """Check that the estimator type returned by the pipeline is correct.

    Non-regression test as part of:
    https://github.com/scikit-learn/scikit-learn/issues/30197
    """
    # Smoke test the repr
    repr(pipeline)
    assert check_estimator_type(pipeline)


def test_sklearn_tags_with_empty_pipeline():
    """Check that we propagate properly the tags in a Pipeline.

    Non-regression test as part of:
    https://github.com/scikit-learn/scikit-learn/issues/30197
    """
    empty_pipeline = Pipeline(steps=[])
    be = BaseEstimator()

    expected_tags = be.__sklearn_tags__()
    assert empty_pipeline.__sklearn_tags__() == expected_tags


def test_feature_union_weights():
    # test feature union with transformer weights
    X = iris.data
    y = iris.target
    pca = PCA(n_components=2, svd_solver="randomized", random_state=0)
    select = SelectKBest(k=1)
    # test using fit followed by transform
    fs = FeatureUnion(
        [("pca", pca), ("select", select)], transformer_weights={"pca": 10}
    )
    fs.fit(X, y)
    X_transformed = fs.transform(X)
    # test using fit_transform
    fs = FeatureUnion(
        [("pca", pca), ("select", select)], transformer_weights={"pca": 10}
    )
    X_fit_transformed = fs.fit_transform(X, y)
    # test it works with transformers missing fit_transform
    fs = FeatureUnion(
        [("mock", Transf()), ("pca", pca), ("select", select)],
        transformer_weights={"mock": 10},
    )
    X_fit_transformed_wo_method = fs.fit_transform(X, y)
    # check against expected result

    # We use a different pca object to control the random_state stream
    assert_array_almost_equal(X_transformed[:, :-1], 10 * pca.fit_transform(X))
    assert_array_equal(X_transformed[:, -1], select.fit_transform(X, y).ravel())
    assert_array_almost_equal(X_fit_transformed[:, :-1], 10 * pca.fit_transform(X))
    assert_array_equal(X_fit_transformed[:, -1], select.fit_transform(X, y).ravel())
    assert X_fit_transformed_wo_method.shape == (X.shape[0], 7)


# TODO: remove mark once loky bug is fixed:
# https://github.com/joblib/loky/issues/458
@pytest.mark.thread_unsafe
def test_feature_union_parallel():
    # test that n_jobs work for FeatureUnion
    X = JUNK_FOOD_DOCS

    fs = FeatureUnion(
        [
            ("words", CountVectorizer(analyzer="word")),
            ("chars", CountVectorizer(analyzer="char")),
        ]
    )

    fs_parallel = FeatureUnion(
        [
            ("words", CountVectorizer(analyzer="word")),
            ("chars", CountVectorizer(analyzer="char")),
        ],
        n_jobs=2,
    )

    fs_parallel2 = FeatureUnion(
        [
            ("words", CountVectorizer(analyzer="word")),
            ("chars", CountVectorizer(analyzer="char")),
        ],
        n_jobs=2,
    )

    fs.fit(X)
    X_transformed = fs.transform(X)
    assert X_transformed.shape[0] == len(X)

    fs_parallel.fit(X)
    X_transformed_parallel = fs_parallel.transform(X)
    assert X_transformed.shape == X_transformed_parallel.shape
    assert_array_equal(X_transformed.toarray(), X_transformed_parallel.toarray())

    # fit_transform should behave the same
    X_transformed_parallel2 = fs_parallel2.fit_transform(X)
    assert_array_equal(X_transformed.toarray(), X_transformed_parallel2.toarray())

    # transformers should stay fit after fit_transform
    X_transformed_parallel2 = fs_parallel2.transform(X)
    assert_array_equal(X_transformed.toarray(), X_transformed_parallel2.toarray())


def test_feature_union_feature_names():
    word_vect = CountVectorizer(analyzer="word")
    char_vect = CountVectorizer(analyzer="char_wb", ngram_range=(3, 3))
    ft = FeatureUnion([("chars", char_vect), ("words", word_vect)])
    ft.fit(JUNK_FOOD_DOCS)
    feature_names = ft.get_feature_names_out()
    for feat in feature_names:
        assert "chars__" in feat or "words__" in feat
    assert len(feature_names) == 35

    ft = FeatureUnion([("tr1", Transf())]).fit([[1]])

    msg = re.escape(
        "Transformer tr1 (type Transf) does not provide get_feature_names_out"
    )
    with pytest.raises(AttributeError, match=msg):
        ft.get_feature_names_out()


def test_classes_property():
    X = iris.data
    y = iris.target

    reg = make_pipeline(SelectKBest(k=1), LinearRegression())
    reg.fit(X, y)
    with pytest.raises(AttributeError):
        getattr(reg, "classes_")

    clf = make_pipeline(SelectKBest(k=1), LogisticRegression(random_state=0))
    with pytest.raises(AttributeError):
        getattr(clf, "classes_")
    clf.fit(X, y)
    assert_array_equal(clf.classes_, np.unique(y))


def test_set_feature_union_steps():
    mult2 = Mult(2)
    mult3 = Mult(3)
    mult5 = Mult(5)

    mult3.get_feature_names_out = lambda input_features: ["x3"]
    mult2.get_feature_names_out = lambda input_features: ["x2"]
    mult5.get_feature_names_out = lambda input_features: ["x5"]

    ft = FeatureUnion([("m2", mult2), ("m3", mult3)])
    assert_array_equal([[2, 3]], ft.transform(np.asarray([[1]])))
    assert_array_equal(["m2__x2", "m3__x3"], ft.get_feature_names_out())

    # Directly setting attr
    ft.transformer_list = [("m5", mult5)]
    assert_array_equal([[5]], ft.transform(np.asarray([[1]])))
    assert_array_equal(["m5__x5"], ft.get_feature_names_out())

    # Using set_params
    ft.set_params(transformer_list=[("mock", mult3)])
    assert_array_equal([[3]], ft.transform(np.asarray([[1]])))
    assert_array_equal(["mock__x3"], ft.get_feature_names_out())

    # Using set_params to replace single step
    ft.set_params(mock=mult5)
    assert_array_equal([[5]], ft.transform(np.asarray([[1]])))
    assert_array_equal(["mock__x5"], ft.get_feature_names_out())


def test_set_feature_union_step_drop():
    mult2 = Mult(2)
    mult3 = Mult(3)

    mult2.get_feature_names_out = lambda input_features: ["x2"]
    mult3.get_feature_names_out = lambda input_features: ["x3"]

    X = np.asarray([[1]])

    ft = FeatureUnion([("m2", mult2), ("m3", mult3)])
    assert_array_equal([[2, 3]], ft.fit(X).transform(X))
    assert_array_equal([[2, 3]], ft.fit_transform(X))
    assert_array_equal(["m2__x2", "m3__x3"], ft.get_feature_names_out())

    ft.set_params(m2="drop")
    assert_array_equal([[3]], ft.fit(X).transform(X))
    assert_array_equal([[3]], ft.fit_transform(X))
    assert_array_equal(["m3__x3"], ft.get_feature_names_out())

    ft.set_params(m3="drop")
    assert_array_equal([[]], ft.fit(X).transform(X))
    assert_array_equal([[]], ft.fit_transform(X))
    assert_array_equal([], ft.get_feature_names_out())

    # check we can change back
    ft.set_params(m3=mult3)
    assert_array_equal([[3]], ft.fit(X).transform(X))

    # Check 'drop' step at construction time
    ft = FeatureUnion([("m2", "drop"), ("m3", mult3)])
    assert_array_equal([[3]], ft.fit(X).transform(X))
    assert_array_equal([[3]], ft.fit_transform(X))
    assert_array_equal(["m3__x3"], ft.get_feature_names_out())


def test_set_feature_union_passthrough():
    """Check the behaviour of setting a transformer to `"passthrough"`."""
    mult2 = Mult(2)
    mult3 = Mult(3)

    # We only test get_features_names_out, as get_feature_names is unsupported by
    # FunctionTransformer, and hence unsupported by FeatureUnion passthrough.
    mult2.get_feature_names_out = lambda input_features: ["x2"]
    mult3.get_feature_names_out = lambda input_features: ["x3"]

    X = np.asarray([[1]])

    ft = FeatureUnion([("m2", mult2), ("m3", mult3)])
    assert_array_equal([[2, 3]], ft.fit(X).transform(X))
    assert_array_equal([[2, 3]], ft.fit_transform(X))
    assert_array_equal(["m2__x2", "m3__x3"], ft.get_feature_names_out())

    ft.set_params(m2="passthrough")
    assert_array_equal([[1, 3]], ft.fit(X).transform(X))
    assert_array_equal([[1, 3]], ft.fit_transform(X))
    assert_array_equal(["m2__myfeat", "m3__x3"], ft.get_feature_names_out(["myfeat"]))

    ft.set_params(m3="passthrough")
    assert_array_equal([[1, 1]], ft.fit(X).transform(X))
    assert_array_equal([[1, 1]], ft.fit_transform(X))
    assert_array_equal(
        ["m2__myfeat", "m3__myfeat"], ft.get_feature_names_out(["myfeat"])
    )

    # check we can change back
    ft.set_params(m3=mult3)
    assert_array_equal([[1, 3]], ft.fit(X).transform(X))
    assert_array_equal([[1, 3]], ft.fit_transform(X))
    assert_array_equal(["m2__myfeat", "m3__x3"], ft.get_feature_names_out(["myfeat"]))

    # Check 'passthrough' step at construction time
    ft = FeatureUnion([("m2", "passthrough"), ("m3", mult3)])
    assert_array_equal([[1, 3]], ft.fit(X).transform(X))
    assert_array_equal([[1, 3]], ft.fit_transform(X))
    assert_array_equal(["m2__myfeat", "m3__x3"], ft.get_feature_names_out(["myfeat"]))

    X = iris.data
    columns = X.shape[1]
    pca = PCA(n_components=2, svd_solver="randomized", random_state=0)

    ft = FeatureUnion([("passthrough", "passthrough"), ("pca", pca)])
    assert_array_equal(X, ft.fit(X).transform(X)[:, :columns])
    assert_array_equal(X, ft.fit_transform(X)[:, :columns])
    assert_array_equal(
        [
            "passthrough__f0",
            "passthrough__f1",
            "passthrough__f2",
            "passthrough__f3",
            "pca__pca0",
            "pca__pca1",
        ],
        ft.get_feature_names_out(["f0", "f1", "f2", "f3"]),
    )

    ft.set_params(pca="passthrough")
    X_ft = ft.fit(X).transform(X)
    assert_array_equal(X_ft, np.hstack([X, X]))
    X_ft = ft.fit_transform(X)
    assert_array_equal(X_ft, np.hstack([X, X]))
    assert_array_equal(
        [
            "passthrough__f0",
            "passthrough__f1",
            "passthrough__f2",
            "passthrough__f3",
            "pca__f0",
            "pca__f1",
            "pca__f2",
            "pca__f3",
        ],
        ft.get_feature_names_out(["f0", "f1", "f2", "f3"]),
    )

    ft.set_params(passthrough=pca)
    assert_array_equal(X, ft.fit(X).transform(X)[:, -columns:])
    assert_array_equal(X, ft.fit_transform(X)[:, -columns:])
    assert_array_equal(
        [
            "passthrough__pca0",
            "passthrough__pca1",
            "pca__f0",
            "pca__f1",
            "pca__f2",
            "pca__f3",
        ],
        ft.get_feature_names_out(["f0", "f1", "f2", "f3"]),
    )

    ft = FeatureUnion(
        [("passthrough", "passthrough"), ("pca", pca)],
        transformer_weights={"passthrough": 2},
    )
    assert_array_equal(X * 2, ft.fit(X).transform(X)[:, :columns])
    assert_array_equal(X * 2, ft.fit_transform(X)[:, :columns])
    assert_array_equal(
        [
            "passthrough__f0",
            "passthrough__f1",
            "passthrough__f2",
            "passthrough__f3",
            "pca__pca0",
            "pca__pca1",
        ],
        ft.get_feature_names_out(["f0", "f1", "f2", "f3"]),
    )


def test_feature_union_passthrough_get_feature_names_out_true():
    """Check feature_names_out for verbose_feature_names_out=True (default)"""
    X = iris.data
    pca = PCA(n_components=2, svd_solver="randomized", random_state=0)

    ft = FeatureUnion([("pca", pca), ("passthrough", "passthrough")])
    ft.fit(X)
    assert_array_equal(
        [
            "pca__pca0",
            "pca__pca1",
            "passthrough__x0",
            "passthrough__x1",
            "passthrough__x2",
            "passthrough__x3",
        ],
        ft.get_feature_names_out(),
    )


def test_feature_union_passthrough_get_feature_names_out_false():
    """Check feature_names_out for verbose_feature_names_out=False"""
    X = iris.data
    pca = PCA(n_components=2, svd_solver="randomized", random_state=0)

    ft = FeatureUnion(
        [("pca", pca), ("passthrough", "passthrough")], verbose_feature_names_out=False
    )
    ft.fit(X)
    assert_array_equal(
        [
            "pca0",
            "pca1",
            "x0",
            "x1",
            "x2",
            "x3",
        ],
        ft.get_feature_names_out(),
    )


def test_feature_union_passthrough_get_feature_names_out_false_errors():
    """Check get_feature_names_out and non-verbose names and colliding names."""
    pd = pytest.importorskip("pandas")
    X = pd.DataFrame([[1, 2], [2, 3]], columns=["a", "b"])

    select_a = FunctionTransformer(
        lambda X: X[["a"]], feature_names_out=lambda self, _: np.asarray(["a"])
    )
    union = FeatureUnion(
        [("t1", StandardScaler()), ("t2", select_a)],
        verbose_feature_names_out=False,
    )
    union.fit(X)

    msg = re.escape(
        "Output feature names: ['a'] are not unique. "
        "Please set verbose_feature_names_out=True to add prefixes to feature names"
    )

    with pytest.raises(ValueError, match=msg):
        union.get_feature_names_out()


def test_feature_union_passthrough_get_feature_names_out_false_errors_overlap_over_5():
    """Check get_feature_names_out with non-verbose names and >= 5 colliding names."""
    pd = pytest.importorskip("pandas")
    X = pd.DataFrame([list(range(10))], columns=[f"f{i}" for i in range(10)])

    union = FeatureUnion(
        [("t1", "passthrough"), ("t2", "passthrough")],
        verbose_feature_names_out=False,
    )

    union.fit(X)

    msg = re.escape(
        "Output feature names: ['f0', 'f1', 'f2', 'f3', 'f4', ...] "
        "are not unique. Please set verbose_feature_names_out=True to add prefixes to"
        " feature names"
    )

    with pytest.raises(ValueError, match=msg):
        union.get_feature_names_out()


def test_step_name_validation():
    error_message_1 = r"Estimator names must not contain __: got \['a__q'\]"
    error_message_2 = r"Names provided are not unique: \['a', 'a'\]"
    error_message_3 = r"Estimator names conflict with constructor arguments: \['%s'\]"
    bad_steps1 = [("a__q", Mult(2)), ("b", Mult(3))]
    bad_steps2 = [("a", Mult(2)), ("a", Mult(3))]
    for cls, param in [(Pipeline, "steps"), (FeatureUnion, "transformer_list")]:
        # we validate in construction (despite scikit-learn convention)
        bad_steps3 = [("a", Mult(2)), (param, Mult(3))]
        for bad_steps, message in [
            (bad_steps1, error_message_1),
            (bad_steps2, error_message_2),
            (bad_steps3, error_message_3 % param),
        ]:
            # three ways to make invalid:
            # - construction
            with pytest.raises(ValueError, match=message):
                cls(**{param: bad_steps}).fit([[1]], [1])

            # - setattr
            est = cls(**{param: [("a", Mult(1))]})
            setattr(est, param, bad_steps)
            with pytest.raises(ValueError, match=message):
                est.fit([[1]], [1])

            with pytest.raises(ValueError, match=message):
                est.fit_transform([[1]], [1])

            # - set_params
            est = cls(**{param: [("a", Mult(1))]})
            est.set_params(**{param: bad_steps})
            with pytest.raises(ValueError, match=message):
                est.fit([[1]], [1])

            with pytest.raises(ValueError, match=message):
                est.fit_transform([[1]], [1])


def test_set_params_nested_pipeline():
    estimator = Pipeline([("a", Pipeline([("b", DummyRegressor())]))])
    estimator.set_params(a__b__alpha=0.001, a__b=Lasso())
    estimator.set_params(a__steps=[("b", LogisticRegression())], a__b__C=5)


def test_pipeline_memory():
    X = iris.data
    y = iris.target
    cachedir = mkdtemp()
    try:
        memory = joblib.Memory(location=cachedir, verbose=10)
        # Test with transformer + logistic regression
        clf = LogisticRegression(random_state=0)
        transf = DummyTransf()
        pipe = Pipeline([("transf", clone(transf)), ("logreg", clf)])
        cached_pipe = Pipeline([("transf", transf), ("logreg", clf)], memory=memory)

        # Memoize the transformer at the first fit
        cached_pipe.fit(X, y)
        pipe.fit(X, y)
        # Get the time stamp of the transformer in the cached pipeline
        ts = cached_pipe.named_steps["transf"].timestamp_
        # Check that cached_pipe and pipe yield identical results
        assert_array_equal(pipe.predict(X), cached_pipe.predict(X))
        assert_array_equal(pipe.predict_proba(X), cached_pipe.predict_proba(X))
        assert_array_equal(pipe.predict_log_proba(X), cached_pipe.predict_log_proba(X))
        assert_array_equal(pipe.score(X, y), cached_pipe.score(X, y))
        assert_array_equal(
            pipe.named_steps["transf"].means_, cached_pipe.named_steps["transf"].means_
        )
        assert not hasattr(transf, "means_")
        # Check that we are reading the cache while fitting
        # a second time
        cached_pipe.fit(X, y)
        # Check that cached_pipe and pipe yield identical results
        assert_array_equal(pipe.predict(X), cached_pipe.predict(X))
        assert_array_equal(pipe.predict_proba(X), cached_pipe.predict_proba(X))
        assert_array_equal(pipe.predict_log_proba(X), cached_pipe.predict_log_proba(X))
        assert_array_equal(pipe.score(X, y), cached_pipe.score(X, y))
        assert_array_equal(
            pipe.named_steps["transf"].means_, cached_pipe.named_steps["transf"].means_
        )
        assert ts == cached_pipe.named_steps["transf"].timestamp_
        # Create a new pipeline with cloned estimators
        # Check that even changing the name step does not affect the cache hit
        clf_2 = LogisticRegression(random_state=0)
        transf_2 = DummyTransf()
        cached_pipe_2 = Pipeline(
            [("transf_2", transf_2), ("logreg", clf_2)], memory=memory
        )
        cached_pipe_2.fit(X, y)

        # Check that cached_pipe and pipe yield identical results
        assert_array_equal(pipe.predict(X), cached_pipe_2.predict(X))
        assert_array_equal(pipe.predict_proba(X), cached_pipe_2.predict_proba(X))
        assert_array_equal(
            pipe.predict_log_proba(X), cached_pipe_2.predict_log_proba(X)
        )
        assert_array_equal(pipe.score(X, y), cached_pipe_2.score(X, y))
        assert_array_equal(
            pipe.named_steps["transf"].means_,
            cached_pipe_2.named_steps["transf_2"].means_,
        )
        assert ts == cached_pipe_2.named_steps["transf_2"].timestamp_
    finally:
        shutil.rmtree(cachedir)


def test_make_pipeline_memory():
    cachedir = mkdtemp()
    memory = joblib.Memory(location=cachedir, verbose=10)
    pipeline = make_pipeline(DummyTransf(), SVC(), memory=memory)
    assert pipeline.memory is memory
    pipeline = make_pipeline(DummyTransf(), SVC())
    assert pipeline.memory is None
    assert len(pipeline) == 2

    shutil.rmtree(cachedir)


class FeatureNameSaver(BaseEstimator):
    def fit(self, X, y=None):
        _check_feature_names(self, X, reset=True)
        return self

    def transform(self, X, y=None):
        return X

    def get_feature_names_out(self, input_features=None):
        return input_features


def test_features_names_passthrough():
    """Check pipeline.get_feature_names_out with passthrough"""
    pipe = Pipeline(
        steps=[
            ("names", FeatureNameSaver()),
            ("pass", "passthrough"),
            ("clf", LogisticRegression()),
        ]
    )
    iris = load_iris()
    pipe.fit(iris.data, iris.target)
    assert_array_equal(
        pipe[:-1].get_feature_names_out(iris.feature_names), iris.feature_names
    )


def test_feature_names_count_vectorizer():
    """Check pipeline.get_feature_names_out with vectorizers"""
    pipe = Pipeline(steps=[("vect", CountVectorizer()), ("clf", LogisticRegression())])
    y = ["pizza" in x for x in JUNK_FOOD_DOCS]
    pipe.fit(JUNK_FOOD_DOCS, y)
    assert_array_equal(
        pipe[:-1].get_feature_names_out(),
        ["beer", "burger", "coke", "copyright", "pizza", "the"],
    )
    assert_array_equal(
        pipe[:-1].get_feature_names_out("nonsense_is_ignored"),
        ["beer", "burger", "coke", "copyright", "pizza", "the"],
    )


def test_pipeline_feature_names_out_error_without_definition():
    """Check that error is raised when a transformer does not define
    `get_feature_names_out`."""
    pipe = Pipeline(steps=[("notrans", NoTrans())])
    iris = load_iris()
    pipe.fit(iris.data, iris.target)

    msg = "does not provide get_feature_names_out"
    with pytest.raises(AttributeError, match=msg):
        pipe.get_feature_names_out()


def test_pipeline_param_error():
    clf = make_pipeline(LogisticRegression())
    with pytest.raises(
        ValueError, match="Pipeline.fit does not accept the sample_weight parameter"
    ):
        clf.fit([[0], [0]], [0, 1], sample_weight=[1, 1])


parameter_grid_test_verbose = (
    (est, pattern, method)
    for (est, pattern), method in itertools.product(
        [
            (
                Pipeline([("transf", Transf()), ("clf", FitParamT())]),
                r"\[Pipeline\].*\(step 1 of 2\) Processing transf.* total=.*\n"
                r"\[Pipeline\].*\(step 2 of 2\) Processing clf.* total=.*\n$",
            ),
            (
                Pipeline([("transf", Transf()), ("noop", None), ("clf", FitParamT())]),
                r"\[Pipeline\].*\(step 1 of 3\) Processing transf.* total=.*\n"
                r"\[Pipeline\].*\(step 2 of 3\) Processing noop.* total=.*\n"
                r"\[Pipeline\].*\(step 3 of 3\) Processing clf.* total=.*\n$",
            ),
            (
                Pipeline(
                    [
                        ("transf", Transf()),
                        ("noop", "passthrough"),
                        ("clf", FitParamT()),
                    ]
                ),
                r"\[Pipeline\].*\(step 1 of 3\) Processing transf.* total=.*\n"
                r"\[Pipeline\].*\(step 2 of 3\) Processing noop.* total=.*\n"
                r"\[Pipeline\].*\(step 3 of 3\) Processing clf.* total=.*\n$",
            ),
            (
                Pipeline([("transf", Transf()), ("clf", None)]),
                r"\[Pipeline\].*\(step 1 of 2\) Processing transf.* total=.*\n"
                r"\[Pipeline\].*\(step 2 of 2\) Processing clf.* total=.*\n$",
            ),
            (
                Pipeline([("transf", None), ("mult", Mult())]),
                r"\[Pipeline\].*\(step 1 of 2\) Processing transf.* total=.*\n"
                r"\[Pipeline\].*\(step 2 of 2\) Processing mult.* total=.*\n$",
            ),
            (
                Pipeline([("transf", "passthrough"), ("mult", Mult())]),
                r"\[Pipeline\].*\(step 1 of 2\) Processing transf.* total=.*\n"
                r"\[Pipeline\].*\(step 2 of 2\) Processing mult.* total=.*\n$",
            ),
            (
                FeatureUnion([("mult1", Mult()), ("mult2", Mult())]),
                r"\[FeatureUnion\].*\(step 1 of 2\) Processing mult1.* total=.*\n"
                r"\[FeatureUnion\].*\(step 2 of 2\) Processing mult2.* total=.*\n$",
            ),
            (
                FeatureUnion([("mult1", "drop"), ("mult2", Mult()), ("mult3", "drop")]),
                r"\[FeatureUnion\].*\(step 1 of 1\) Processing mult2.* total=.*\n$",
            ),
        ],
        ["fit", "fit_transform", "fit_predict"],
    )
    if hasattr(est, method)
    and not (
        method == "fit_transform"
        and hasattr(est, "steps")
        and isinstance(est.steps[-1][1], FitParamT)
    )
)


@pytest.mark.parametrize("est, pattern, method", parameter_grid_test_verbose)
def test_verbose(est, method, pattern, capsys):
    func = getattr(est, method)

    X = [[1, 2, 3], [4, 5, 6]]
    y = [[7], [8]]

    est.set_params(verbose=False)
    func(X, y)
    assert not capsys.readouterr().out, "Got output for verbose=False"

    est.set_params(verbose=True)
    func(X, y)
    assert re.match(pattern, capsys.readouterr().out)


def test_n_features_in_pipeline():
    # make sure pipelines delegate n_features_in to the first step

    X = [[1, 2], [3, 4], [5, 6]]
    y = [0, 1, 2]

    ss = StandardScaler()
    gbdt = HistGradientBoostingClassifier()
    pipe = make_pipeline(ss, gbdt)
    assert not hasattr(pipe, "n_features_in_")
    pipe.fit(X, y)
    assert pipe.n_features_in_ == ss.n_features_in_ == 2

    # if the first step has the n_features_in attribute then the pipeline also
    # has it, even though it isn't fitted.
    ss = StandardScaler()
    gbdt = HistGradientBoostingClassifier()
    pipe = make_pipeline(ss, gbdt)
    ss.fit(X, y)
    assert pipe.n_features_in_ == ss.n_features_in_ == 2
    assert not hasattr(gbdt, "n_features_in_")


def test_n_features_in_feature_union():
    # make sure FeatureUnion delegates n_features_in to the first transformer

    X = [[1, 2], [3, 4], [5, 6]]
    y = [0, 1, 2]

    ss = StandardScaler()
    fu = make_union(ss)
    assert not hasattr(fu, "n_features_in_")
    fu.fit(X, y)
    assert fu.n_features_in_ == ss.n_features_in_ == 2

    # if the first step has the n_features_in attribute then the feature_union
    # also has it, even though it isn't fitted.
    ss = StandardScaler()
    fu = make_union(ss)
    ss.fit(X, y)
    assert fu.n_features_in_ == ss.n_features_in_ == 2


def test_feature_union_fit_params():
    # Regression test for issue: #15117
    class DummyTransformer(TransformerMixin, BaseEstimator):
        def fit(self, X, y=None, **fit_params):
            if fit_params != {"a": 0}:
                raise ValueError
            return self

        def transform(self, X, y=None):
            return X

    X, y = iris.data, iris.target
    t = FeatureUnion([("dummy0", DummyTransformer()), ("dummy1", DummyTransformer())])
    with pytest.raises(ValueError):
        t.fit(X, y)

    with pytest.raises(ValueError):
        t.fit_transform(X, y)

    t.fit(X, y, a=0)
    t.fit_transform(X, y, a=0)


def test_feature_union_fit_params_without_fit_transform():
    # Test that metadata is passed correctly to underlying transformers that don't
    # implement a `fit_transform` method when SLEP6 is not enabled.

    class DummyTransformer(ConsumingNoFitTransformTransformer):
        def fit(self, X, y=None, **fit_params):
            if fit_params != {"metadata": 1}:
                raise ValueError
            return self

    X, y = iris.data, iris.target
    t = FeatureUnion(
        [
            ("nofittransform0", DummyTransformer()),
            ("nofittransform1", DummyTransformer()),
        ]
    )

    with pytest.raises(ValueError):
        t.fit_transform(X, y, metadata=0)

    t.fit_transform(X, y, metadata=1)


def test_pipeline_missing_values_leniency():
    # check that pipeline let the missing values validation to
    # the underlying transformers and predictors.
    X, y = iris.data.copy(), iris.target.copy()
    mask = np.random.choice([1, 0], X.shape, p=[0.1, 0.9]).astype(bool)
    X[mask] = np.nan
    pipe = make_pipeline(SimpleImputer(), LogisticRegression())
    assert pipe.fit(X, y).score(X, y) > 0.4


def test_feature_union_warns_unknown_transformer_weight():
    # Warn user when transformer_weights containers a key not present in
    # transformer_list
    X = [[1, 2], [3, 4], [5, 6]]
    y = [0, 1, 2]

    transformer_list = [("transf", Transf())]
    # Transformer weights dictionary with incorrect name
    weights = {"transformer": 1}
    expected_msg = (
        'Attempting to weight transformer "transformer", '
        "but it is not present in transformer_list."
    )
    union = FeatureUnion(transformer_list, transformer_weights=weights)
    with pytest.raises(ValueError, match=expected_msg):
        union.fit(X, y)


@pytest.mark.parametrize("passthrough", [None, "passthrough"])
def test_pipeline_get_tags_none(passthrough):
    # Checks that tags are set correctly when the first transformer is None or
    # 'passthrough'
    # Non-regression test for:
    # https://github.com/scikit-learn/scikit-learn/issues/18815
    pipe = make_pipeline(passthrough, SVC())
    assert not pipe.__sklearn_tags__().input_tags.pairwise


# FIXME: Replace this test with a full `check_estimator` once we have API only
# checks.
@pytest.mark.parametrize("Predictor", [MinimalRegressor, MinimalClassifier])
def test_search_cv_using_minimal_compatible_estimator(Predictor):
    # Check that third-party library estimators can be part of a pipeline
    # and tuned by grid-search without inheriting from BaseEstimator.
    rng = np.random.RandomState(0)
    X, y = rng.randn(25, 2), np.array([0] * 5 + [1] * 20)

    model = Pipeline(
        [("transformer", MinimalTransformer()), ("predictor", Predictor())]
    )
    model.fit(X, y)

    y_pred = model.predict(X)
    if is_classifier(model):
        assert_array_equal(y_pred, 1)
        assert model.score(X, y) == pytest.approx(accuracy_score(y, y_pred))
    else:
        assert_allclose(y_pred, y.mean())
        assert model.score(X, y) == pytest.approx(r2_score(y, y_pred))


def test_pipeline_check_if_fitted():
    class Estimator(BaseEstimator):
        def fit(self, X, y):
            self.fitted_ = True
            return self

    pipeline = Pipeline([("clf", Estimator())])
    with pytest.raises(NotFittedError):
        check_is_fitted(pipeline)
    pipeline.fit(iris.data, iris.target)
    check_is_fitted(pipeline)


def test_feature_union_check_if_fitted():
    """Check __sklearn_is_fitted__ is defined correctly."""

    X = [[1, 2], [3, 4], [5, 6]]
    y = [0, 1, 2]

    union = FeatureUnion([("clf", MinimalTransformer())])
    with pytest.raises(NotFittedError):
        check_is_fitted(union)

    union.fit(X, y)
    check_is_fitted(union)

    # passthrough is stateless
    union = FeatureUnion([("pass", "passthrough")])
    check_is_fitted(union)

    union = FeatureUnion([("clf", MinimalTransformer()), ("pass", "passthrough")])
    with pytest.raises(NotFittedError):
        check_is_fitted(union)

    union.fit(X, y)
    check_is_fitted(union)


def test_pipeline_get_feature_names_out_passes_names_through():
    """Check that pipeline passes names through.

    Non-regresion test for #21349.
    """
    X, y = iris.data, iris.target

    class AddPrefixStandardScalar(StandardScaler):
        def get_feature_names_out(self, input_features=None):
            names = super().get_feature_names_out(input_features=input_features)
            return np.asarray([f"my_prefix_{name}" for name in names], dtype=object)

    pipe = make_pipeline(AddPrefixStandardScalar(), StandardScaler())
    pipe.fit(X, y)

    input_names = iris.feature_names
    feature_names_out = pipe.get_feature_names_out(input_names)

    assert_array_equal(feature_names_out, [f"my_prefix_{name}" for name in input_names])


def test_pipeline_set_output_integration():
    """Test pipeline's set_output with feature names."""
    pytest.importorskip("pandas")

    X, y = load_iris(as_frame=True, return_X_y=True)

    pipe = make_pipeline(StandardScaler(), LogisticRegression())
    pipe.set_output(transform="pandas")
    pipe.fit(X, y)

    feature_names_in_ = pipe[:-1].get_feature_names_out()
    log_reg_feature_names = pipe[-1].feature_names_in_

    assert_array_equal(feature_names_in_, log_reg_feature_names)


def test_feature_union_set_output():
    """Test feature union with set_output API."""
    pd = pytest.importorskip("pandas")

    X, _ = load_iris(as_frame=True, return_X_y=True)
    X_train, X_test = train_test_split(X, random_state=0)
    union = FeatureUnion([("scalar", StandardScaler()), ("pca", PCA())])
    union.set_output(transform="pandas")
    union.fit(X_train)

    X_trans = union.transform(X_test)
    assert isinstance(X_trans, pd.DataFrame)
    assert_array_equal(X_trans.columns, union.get_feature_names_out())
    assert_array_equal(X_trans.index, X_test.index)


def test_feature_union_getitem():
    """Check FeatureUnion.__getitem__ returns expected results."""
    scalar = StandardScaler()
    pca = PCA()
    union = FeatureUnion(
        [
            ("scalar", scalar),
            ("pca", pca),
            ("pass", "passthrough"),
            ("drop_me", "drop"),
        ]
    )
    assert union["scalar"] is scalar
    assert union["pca"] is pca
    assert union["pass"] == "passthrough"
    assert union["drop_me"] == "drop"


@pytest.mark.parametrize("key", [0, slice(0, 2)])
def test_feature_union_getitem_error(key):
    """Raise error when __getitem__ gets a non-string input."""

    union = FeatureUnion([("scalar", StandardScaler()), ("pca", PCA())])

    msg = "Only string keys are supported"
    with pytest.raises(KeyError, match=msg):
        union[key]


def test_feature_union_feature_names_in_():
    """Ensure feature union has `.feature_names_in_` attribute if `X` has a
    `columns` attribute.

    Test for #24754.
    """
    pytest.importorskip("pandas")

    X, _ = load_iris(as_frame=True, return_X_y=True)

    # FeatureUnion should have the feature_names_in_ attribute if the
    # first transformer also has it
    scaler = StandardScaler()
    scaler.fit(X)
    union = FeatureUnion([("scale", scaler)])
    assert hasattr(union, "feature_names_in_")
    assert_array_equal(X.columns, union.feature_names_in_)
    assert_array_equal(scaler.feature_names_in_, union.feature_names_in_)

    # fit with pandas.DataFrame
    union = FeatureUnion([("pass", "passthrough")])
    union.fit(X)
    assert hasattr(union, "feature_names_in_")
    assert_array_equal(X.columns, union.feature_names_in_)

    # fit with numpy array
    X_array = X.to_numpy()
    union = FeatureUnion([("pass", "passthrough")])
    union.fit(X_array)
    assert not hasattr(union, "feature_names_in_")


def test_feature_union_1d_output():
    """Test that FeatureUnion raises error for 1D transformer outputs."""
    X = np.arange(6).reshape(3, 2)

    with pytest.raises(
        ValueError,
        match="Transformer 'b' returned an array or dataframe with 1 dimensions",
    ):
        FeatureUnion(
            [
                ("a", FunctionTransformer(lambda X: X)),
                ("b", FunctionTransformer(lambda X: X[:, 1])),
            ]
        ).fit_transform(X)


# transform_input tests
# =====================


@config_context(enable_metadata_routing=True)
@pytest.mark.parametrize("method", ["fit", "fit_transform"])
def test_transform_input_pipeline(method):
    """Test that with transform_input, data is correctly transformed for each step."""

    def get_transformer(registry, sample_weight, metadata):
        """Get a transformer with requests set."""
        return (
            ConsumingTransformer(registry=registry)
            .set_fit_request(sample_weight=sample_weight, metadata=metadata)
            .set_transform_request(sample_weight=sample_weight, metadata=metadata)
        )

    def get_pipeline():
        """Get a pipeline and corresponding registries.

        The pipeline has 4 steps, with different request values set to test different
        cases. One is aliased.
        """
        registry_1, registry_2, registry_3, registry_4 = (
            _Registry(),
            _Registry(),
            _Registry(),
            _Registry(),
        )
        pipe = make_pipeline(
            get_transformer(registry_1, sample_weight=True, metadata=True),
            get_transformer(registry_2, sample_weight=False, metadata=False),
            get_transformer(registry_3, sample_weight=True, metadata=True),
            get_transformer(registry_4, sample_weight="other_weights", metadata=True),
            transform_input=["sample_weight"],
        )
        return pipe, registry_1, registry_2, registry_3, registry_4

    def check_metadata(registry, methods, **metadata):
        """Check that the right metadata was recorded for the given methods."""
        assert registry
        for estimator in registry:
            for method in methods:
                check_recorded_metadata(
                    estimator,
                    method=method,
                    parent=method,
                    **metadata,
                )

    X = np.array([[1, 2], [3, 4]])
    y = np.array([0, 1])
    sample_weight = np.array([[1, 2]])
    other_weights = np.array([[30, 40]])
    metadata = np.array([[100, 200]])

    pipe, registry_1, registry_2, registry_3, registry_4 = get_pipeline()
    pipe.fit(
        X,
        y,
        sample_weight=sample_weight,
        other_weights=other_weights,
        metadata=metadata,
    )

    check_metadata(
        registry_1, ["fit", "transform"], sample_weight=sample_weight, metadata=metadata
    )
    check_metadata(registry_2, ["fit", "transform"])
    check_metadata(
        registry_3,
        ["fit", "transform"],
        sample_weight=sample_weight + 2,
        metadata=metadata,
    )
    check_metadata(
        registry_4,
        method.split("_"),  # ["fit", "transform"] if "fit_transform", ["fit"] otherwise
        sample_weight=other_weights + 3,
        metadata=metadata,
    )


@config_context(enable_metadata_routing=True)
def test_transform_input_explicit_value_check():
    """Test that the right transformed values are passed to `fit`."""

    class Transformer(TransformerMixin, BaseEstimator):
        def fit(self, X, y):
            self.fitted_ = True
            return self

        def transform(self, X):
            return X + 1

    class Estimator(ClassifierMixin, BaseEstimator):
        def fit(self, X, y, X_val=None, y_val=None):
            assert_array_equal(X, np.array([[1, 2]]))
            assert_array_equal(y, np.array([0, 1]))
            assert_array_equal(X_val, np.array([[2, 3]]))
            assert_array_equal(y_val, np.array([0, 1]))
            return self

    X = np.array([[0, 1]])
    y = np.array([0, 1])
    X_val = np.array([[1, 2]])
    y_val = np.array([0, 1])
    pipe = Pipeline(
        [
            ("transformer", Transformer()),
            ("estimator", Estimator().set_fit_request(X_val=True, y_val=True)),
        ],
        transform_input=["X_val"],
    )
    pipe.fit(X, y, X_val=X_val, y_val=y_val)


def test_transform_input_no_slep6():
    """Make sure the right error is raised if slep6 is not enabled."""
    X = np.array([[1, 2], [3, 4]])
    y = np.array([0, 1])
    msg = "The `transform_input` parameter can only be set if metadata"
    with pytest.raises(ValueError, match=msg):
        make_pipeline(DummyTransf(), transform_input=["blah"]).fit(X, y)


@config_context(enable_metadata_routing=True)
def test_transform_tuple_input():
    """Test that if metadata is a tuple of arrays, both arrays are transformed."""

    class Estimator(ClassifierMixin, BaseEstimator):
        def fit(self, X, y, X_val=None, y_val=None):
            assert isinstance(X_val, tuple)
            assert isinstance(y_val, tuple)
            # Here we make sure that each X_val is transformed by the transformer
            assert_array_equal(X_val[0], np.array([[2, 3]]))
            assert_array_equal(y_val[0], np.array([0, 1]))
            assert_array_equal(X_val[1], np.array([[11, 12]]))
            assert_array_equal(y_val[1], np.array([1, 2]))
            self.fitted_ = True
            return self

    class Transformer(TransformerMixin, BaseEstimator):
        def fit(self, X, y):
            self.fitted_ = True
            return self

        def transform(self, X):
            return X + 1

    X = np.array([[1, 2]])
    y = np.array([0, 1])
    X_val0 = np.array([[1, 2]])
    y_val0 = np.array([0, 1])
    X_val1 = np.array([[10, 11]])
    y_val1 = np.array([1, 2])
    pipe = Pipeline(
        [
            ("transformer", Transformer()),
            ("estimator", Estimator().set_fit_request(X_val=True, y_val=True)),
        ],
        transform_input=["X_val"],
    )
    pipe.fit(X, y, X_val=(X_val0, X_val1), y_val=(y_val0, y_val1))


# end of transform_input tests
# =============================


@pytest.mark.parametrize(
    "method",
    [
        "predict",
        "predict_proba",
        "predict_log_proba",
        "decision_function",
        "score",
        "score_samples",
        "transform",
        "inverse_transform",
    ],
)
def test_pipeline_warns_not_fitted(method):
    class StatelessEstimator(BaseEstimator):
        """Stateless estimator that doesn't check if it's fitted.

        Stateless estimators that don't require fit, should properly set the
        `requires_fit` flag and implement a `__sklearn_check_is_fitted__` returning
        `True`.
        """

        def fit(self, X, y):
            return self  # pragma: no cover

        def transform(self, X):
            return X

        def predict(self, X):
            return np.ones(len(X))

        def predict_proba(self, X):
            return np.ones(len(X))

        def predict_log_proba(self, X):
            return np.zeros(len(X))

        def decision_function(self, X):
            return np.ones(len(X))

        def score(self, X, y):
            return 1

        def score_samples(self, X):
            return np.ones(len(X))

        def inverse_transform(self, X):
            return X

    pipe = Pipeline([("estimator", StatelessEstimator())])
    with pytest.raises(NotFittedError):
        getattr(pipe, method)([[1]])


# Test that metadata is routed correctly for pipelines and FeatureUnion
# =====================================================================


class SimpleEstimator(BaseEstimator):
    # This class is used in this section for testing routing in the pipeline.
    # This class should have every set_{method}_request
    def __sklearn_is_fitted__(self):
        return True

    def fit(self, X, y, sample_weight=None, prop=None):
        assert sample_weight is not None, sample_weight
        assert prop is not None, prop
        return self

    def fit_transform(self, X, y, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return X + 1

    def fit_predict(self, X, y, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return np.ones(len(X))

    def predict(self, X, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return np.ones(len(X))

    def predict_proba(self, X, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return np.ones(len(X))

    def predict_log_proba(self, X, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return np.zeros(len(X))

    def decision_function(self, X, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return np.ones(len(X))

    def score(self, X, y, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return 1

    def transform(self, X, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return X + 1

    def inverse_transform(self, X, sample_weight=None, prop=None):
        assert sample_weight is not None
        assert prop is not None
        return X - 1


# split and partial_fit not relevant for pipelines
@pytest.mark.parametrize("method", sorted(set(METHODS) - {"split", "partial_fit"}))
@config_context(enable_metadata_routing=True)
def test_metadata_routing_for_pipeline(method):
    """Test that metadata is routed correctly for pipelines."""

    def set_request(est, method, **kwarg):
        """Set requests for a given method.

        If the given method is a composite method, set the same requests for
        all the methods that compose it.
        """
        if method in COMPOSITE_METHODS:
            methods = COMPOSITE_METHODS[method]
        else:
            methods = [method]

        for method in methods:
            getattr(est, f"set_{method}_request")(**kwarg)
        return est

    X, y = np.array([[1]]), np.array([1])
    sample_weight, prop, metadata = [1], "a", "b"

    # test that metadata is routed correctly for pipelines when requested
    est = SimpleEstimator()
    est = set_request(est, method, sample_weight=True, prop=True)
    est = set_request(est, "fit", sample_weight=True, prop=True)
    trs = (
        ConsumingTransformer()
        .set_fit_request(sample_weight=True, metadata=True)
        .set_transform_request(sample_weight=True, metadata=True)
        .set_inverse_transform_request(sample_weight=True, metadata=True)
    )
    pipeline = Pipeline([("trs", trs), ("estimator", est)])

    if "fit" not in method:
        pipeline = pipeline.fit(X, y, sample_weight=sample_weight, prop=prop)

    try:
        getattr(pipeline, method)(
            X, y, sample_weight=sample_weight, prop=prop, metadata=metadata
        )
    except TypeError:
        # Some methods don't accept y
        getattr(pipeline, method)(
            X, sample_weight=sample_weight, prop=prop, metadata=metadata
        )

    # Make sure the transformer has received the metadata
    # For the transformer, always only `fit` and `transform` are called.
    check_recorded_metadata(
        obj=trs,
        method="fit",
        parent="fit",
        sample_weight=sample_weight,
        metadata=metadata,
    )
    check_recorded_metadata(
        obj=trs,
        method="transform",
        parent="transform",
        sample_weight=sample_weight,
        metadata=metadata,
    )


# split and partial_fit not relevant for pipelines
# sorted is here needed to make `pytest -nX` work. W/o it, tests are collected
# in different orders between workers and that makes it fail.
@pytest.mark.parametrize("method", sorted(set(METHODS) - {"split", "partial_fit"}))
@config_context(enable_metadata_routing=True)
def test_metadata_routing_error_for_pipeline(method):
    """Test that metadata is not routed for pipelines when not requested."""
    X, y = [[1]], [1]
    sample_weight, prop = [1], "a"
    est = SimpleEstimator()
    # here not setting sample_weight request and leaving it as None
    pipeline = Pipeline([("estimator", est)])
    error_message = (
        "[sample_weight, prop] are passed but are not explicitly set as requested"
        f" or not requested for SimpleEstimator.{method}"
    )
    with pytest.raises(ValueError, match=re.escape(error_message)):
        try:
            # passing X, y positional as the first two arguments
            getattr(pipeline, method)(X, y, sample_weight=sample_weight, prop=prop)
        except TypeError:
            # not all methods accept y (like `predict`), so here we only
            # pass X as a positional arg.
            getattr(pipeline, method)(X, sample_weight=sample_weight, prop=prop)


@pytest.mark.parametrize(
    "method", ["decision_function", "transform", "inverse_transform"]
)
def test_routing_passed_metadata_not_supported(method):
    """Test that the right error message is raised when metadata is passed while
    not supported when `enable_metadata_routing=False`."""

    pipe = Pipeline([("estimator", SimpleEstimator())])

    with pytest.raises(
        ValueError, match="is only supported if enable_metadata_routing=True"
    ):
        getattr(pipe, method)([[1]], sample_weight=[1], prop="a")


@config_context(enable_metadata_routing=True)
def test_pipeline_with_estimator_with_len():
    """Test that pipeline works with estimators that have a `__len__` method."""
    pipe = Pipeline(
        [("trs", RandomTreesEmbedding()), ("estimator", RandomForestClassifier())]
    )
    pipe.fit([[1]], [1])
    pipe.predict([[1]])


@pytest.mark.parametrize("last_step", [None, "passthrough"])
@config_context(enable_metadata_routing=True)
def test_pipeline_with_no_last_step(last_step):
    """Test that the pipeline works when there is not last step.

    It should just ignore and pass through the data on transform.
    """
    pipe = Pipeline([("trs", FunctionTransformer()), ("estimator", last_step)])
    assert pipe.fit([[1]], [1]).transform([[1], [2], [3]]) == [[1], [2], [3]]


@config_context(enable_metadata_routing=True)
def test_feature_union_metadata_routing_error():
    """Test that the right error is raised when metadata is not requested."""
    X = np.array([[0, 1], [2, 2], [4, 6]])
    y = [1, 2, 3]
    sample_weight, metadata = [1, 1, 1], "a"

    # test lacking set_fit_request
    feature_union = FeatureUnion([("sub_transformer", ConsumingTransformer())])

    error_message = (
        "[sample_weight, metadata] are passed but are not explicitly set as requested"
        f" or not requested for {ConsumingTransformer.__name__}.fit"
    )

    with pytest.raises(UnsetMetadataPassedError, match=re.escape(error_message)):
        feature_union.fit(X, y, sample_weight=sample_weight, metadata=metadata)

    # test lacking set_transform_request
    feature_union = FeatureUnion(
        [
            (
                "sub_transformer",
                ConsumingTransformer().set_fit_request(
                    sample_weight=True, metadata=True
                ),
            )
        ]
    )

    error_message = (
        "[sample_weight, metadata] are passed but are not explicitly set as requested "
        f"or not requested for {ConsumingTransformer.__name__}.transform"
    )

    with pytest.raises(UnsetMetadataPassedError, match=re.escape(error_message)):
        feature_union.fit(
            X, y, sample_weight=sample_weight, metadata=metadata
        ).transform(X, sample_weight=sample_weight, metadata=metadata)


@config_context(enable_metadata_routing=True)
def test_feature_union_get_metadata_routing_without_fit():
    """Test that get_metadata_routing() works regardless of the Child's
    consumption of any metadata."""
    feature_union = FeatureUnion([("sub_transformer", ConsumingTransformer())])
    feature_union.get_metadata_routing()


@config_context(enable_metadata_routing=True)
@pytest.mark.parametrize(
    "transformer", [ConsumingTransformer, ConsumingNoFitTransformTransformer]
)
def test_feature_union_metadata_routing(transformer):
    """Test that metadata is routed correctly for FeatureUnion."""
    X = np.array([[0, 1], [2, 2], [4, 6]])
    y = [1, 2, 3]
    sample_weight, metadata = [1, 1, 1], "a"

    feature_union = FeatureUnion(
        [
            (
                "sub_trans1",
                transformer(registry=_Registry())
                .set_fit_request(sample_weight=True, metadata=True)
                .set_transform_request(sample_weight=True, metadata=True),
            ),
            (
                "sub_trans2",
                transformer(registry=_Registry())
                .set_fit_request(sample_weight=True, metadata=True)
                .set_transform_request(sample_weight=True, metadata=True),
            ),
        ]
    )

    kwargs = {"sample_weight": sample_weight, "metadata": metadata}
    feature_union.fit(X, y, **kwargs)
    feature_union.fit_transform(X, y, **kwargs)
    feature_union.fit(X, y, **kwargs).transform(X, **kwargs)

    for transformer in feature_union.transformer_list:
        # access sub-transformer in (name, trans) with transformer[1]
        registry = transformer[1].registry
        assert len(registry)
        for sub_trans in registry:
            check_recorded_metadata(
                obj=sub_trans,
                method="fit",
                parent="fit",
                **kwargs,
            )


# End of routing tests
# ====================
